LSTM and GRU for Time Series Forecasting (from scratch + PyTorch)#
Recurrent neural networks model sequences by carrying state forward in time. LSTMs and GRUs add gates that make it much easier to learn long-range patterns (seasonality, delayed effects, regime changes) without gradients vanishing/exploding.
In this notebook you will:
Build intuition for memory + gates in LSTM/GRU
Implement a single-layer LSTM and single-layer GRU in pure NumPy (forward + backprop through time)
Train on a small forecasting task and visualize predictions
Reproduce the same setup using PyTorch (
nn.LSTM,nn.GRU)Visualize learned dynamics with Plotly
import sys
import numpy as np
import plotly.express as px
import plotly.graph_objects as go
import os
import plotly.io as pio
from plotly.subplots import make_subplots
import torch
from torch import nn
from torch.utils.data import DataLoader, Dataset
pio.templates.default = "plotly_white"
pio.renderers.default = os.environ.get("PLOTLY_RENDERER", "notebook")
np.set_printoptions(precision=4, suppress=True)
SEED = 42
rng = np.random.default_rng(SEED)
torch.manual_seed(SEED)
<torch._C.Generator at 0x786441d68290>
print("Python:", sys.version.split()[0])
print("NumPy:", np.__version__)
import plotly
print("Plotly:", plotly.__version__)
print("PyTorch:", torch.__version__)
Python: 3.12.9
NumPy: 1.26.2
Plotly: 6.5.2
PyTorch: 2.7.0+cu126
0) Problem setup: one-step-ahead forecasting#
We’ll learn a mapping:
Input: a window \((x_{t-L+1}, \dots, x_t)\) of length \(L\)
Target: the next value \(x_{t+1}\)
This is the simplest useful forecasting setup and keeps the math in the from-scratch section readable.
def make_toy_series(n: int = 800, noise: float = 0.15, seed: int = 42):
rng_local = np.random.default_rng(seed)
t = np.arange(n, dtype=float)
# A mix of seasonality + trend + noise (toy, but non-trivial)
y = (
1.2 * np.sin(2 * np.pi * t / 50)
+ 0.6 * np.sin(2 * np.pi * t / 17 + 0.8)
+ 0.0025 * t
)
y += noise * rng_local.normal(size=n)
return t, y
def make_windows(series_1d: np.ndarray, seq_len: int):
X = []
y = []
target_idx = []
for start in range(0, len(series_1d) - seq_len):
end = start + seq_len
X.append(series_1d[start:end])
y.append(series_1d[end])
target_idx.append(end)
X = np.asarray(X, dtype=float)[:, :, None] # (N, T, 1)
y = np.asarray(y, dtype=float)[:, None] # (N, 1)
target_idx = np.asarray(target_idx, dtype=int)
return X, y, target_idx
t, y = make_toy_series(n=900, noise=0.18, seed=SEED)
split_t = int(0.7 * len(y))
train_mean = float(y[:split_t].mean())
train_std = float(y[:split_t].std() + 1e-8)
y_scaled = (y - train_mean) / train_std
SEQ_LEN = 60
X_all, y_all, target_idx = make_windows(y_scaled, seq_len=SEQ_LEN)
train_mask = target_idx < split_t
test_mask = ~train_mask
X_train, y_train = X_all[train_mask], y_all[train_mask]
X_test, y_test = X_all[test_mask], y_all[test_mask]
t_train = t[target_idx[train_mask]]
t_test = t[target_idx[test_mask]]
X_train.shape, y_train.shape, X_test.shape, y_test.shape
((570, 60, 1), (570, 1), (270, 60, 1), (270, 1))
fig = go.Figure()
fig.add_trace(go.Scatter(x=t, y=y, mode="lines", name="series"))
fig.add_vline(x=t[split_t], line_dash="dash", line_color="black")
fig.update_layout(
title="Toy time series (train/test split)",
xaxis_title="time index",
yaxis_title="value",
legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1),
)
fig.show()
1) Why gated RNNs (LSTM/GRU) help#
A vanilla RNN is:
and gradients backpropagate through many repeated Jacobian multiplications. Over long sequences this often leads to:
vanishing gradients: early time steps get almost no learning signal
exploding gradients: unstable updates without clipping
Gates make it easier for the model to choose what to keep, overwrite, or expose at each step.
2) LSTM: explicit memory cell#
An LSTM keeps two states:
cell state \(c_t\) (long-term memory)
hidden state \(h_t\) (what the model exposes to the next layer / output)
With \(z_t = [x_t, h_{t-1}]\) (concatenation), the core equations are:
Intuition:
\(f_t\) chooses what memory to keep from \(c_{t-1}\)
\(i_t\) chooses how much new information to write
\(o_t\) chooses how much memory to expose to \(h_t\)
3) GRU: fewer gates, no separate cell state#
A GRU merges the cell and hidden state and uses two gates:
Intuition:
\(z_t\) interpolates between keeping the old state vs writing the candidate
\(r_t\) decides how much of the old state to use when forming the candidate
4) NumPy from scratch (forward + BPTT)#
We’ll implement two tiny forecasters:
NumpyLSTMForecasterNumpyGRUForecaster
Both:
take
Xshaped(batch, seq_len, 1)run a single recurrent layer
predict the next value via a linear head on the last hidden state
We’ll also add:
Adam optimizer (NumPy)
gradient clipping (important for RNNs)
def sigmoid(x: np.ndarray) -> np.ndarray:
x = np.clip(x, -60, 60)
return 1.0 / (1.0 + np.exp(-x))
def mse_loss(y_hat: np.ndarray, y: np.ndarray):
diff = y_hat - y
loss = float(np.mean(diff**2))
dY = (2.0 / diff.size) * diff
return loss, dY
def clip_grads_(grads: dict[str, np.ndarray], max_norm: float = 1.0) -> float:
total_sq = 0.0
for g in grads.values():
total_sq += float(np.sum(g * g))
total_norm = float(np.sqrt(total_sq))
if total_norm > max_norm:
scale = max_norm / (total_norm + 1e-12)
for k in grads:
grads[k] *= scale
return total_norm
def init_weight(rng: np.random.Generator, fan_in: int, fan_out: int):
# Xavier/Glorot uniform
limit = np.sqrt(6.0 / (fan_in + fan_out))
return rng.uniform(-limit, limit, size=(fan_in, fan_out)).astype(float)
class Adam:
def __init__(
self,
params: dict[str, np.ndarray],
lr: float = 1e-3,
betas: tuple[float, float] = (0.9, 0.999),
eps: float = 1e-8,
):
self.lr = lr
self.beta1, self.beta2 = betas
self.eps = eps
self.t = 0
self.m = {k: np.zeros_like(v) for k, v in params.items()}
self.v = {k: np.zeros_like(v) for k, v in params.items()}
def step(self, params: dict[str, np.ndarray], grads: dict[str, np.ndarray]):
self.t += 1
lr_t = self.lr * (np.sqrt(1.0 - self.beta2**self.t) / (1.0 - self.beta1**self.t))
for k in params:
g = grads[k]
self.m[k] = self.beta1 * self.m[k] + (1.0 - self.beta1) * g
self.v[k] = self.beta2 * self.v[k] + (1.0 - self.beta2) * (g * g)
params[k] -= lr_t * self.m[k] / (np.sqrt(self.v[k]) + self.eps)
4.1) NumPy LSTM forecaster#
This is a single-layer LSTM with a linear prediction head.
Implementation notes:
We store per-timestep intermediates (gates, states, concatenated input) in a cache.
Backprop is standard BPTT over that cache.
class NumpyLSTMForecaster:
def __init__(self, input_size: int, hidden_size: int, seed: int = 42):
self.input_size = input_size
self.hidden_size = hidden_size
rng_local = np.random.default_rng(seed)
fan_in = input_size + hidden_size
self.params = {
'W_i': init_weight(rng_local, fan_in, hidden_size),
'b_i': np.zeros((hidden_size,), dtype=float),
'W_f': init_weight(rng_local, fan_in, hidden_size),
'b_f': np.zeros((hidden_size,), dtype=float),
'W_o': init_weight(rng_local, fan_in, hidden_size),
'b_o': np.zeros((hidden_size,), dtype=float),
'W_g': init_weight(rng_local, fan_in, hidden_size),
'b_g': np.zeros((hidden_size,), dtype=float),
'W_y': init_weight(rng_local, hidden_size, 1),
'b_y': np.zeros((1,), dtype=float),
}
self.cache = None
self.last_h = None
def forward(self, X: np.ndarray) -> np.ndarray:
# X: (B, T, D)
B, T, D = X.shape
H = self.hidden_size
h = np.zeros((B, H), dtype=float)
c = np.zeros((B, H), dtype=float)
caches = []
p = self.params
for t in range(T):
x_t = X[:, t, :]
z = np.concatenate([x_t, h], axis=1) # (B, D+H)
i = sigmoid(z @ p['W_i'] + p['b_i'])
f = sigmoid(z @ p['W_f'] + p['b_f'])
o = sigmoid(z @ p['W_o'] + p['b_o'])
g = np.tanh(z @ p['W_g'] + p['b_g'])
c_next = f * c + i * g
tanh_c = np.tanh(c_next)
h_next = o * tanh_c
caches.append(
{
'z': z,
'i': i,
'f': f,
'o': o,
'g': g,
'c_prev': c,
'c': c_next,
'tanh_c': tanh_c,
}
)
h, c = h_next, c_next
y_hat = h @ p['W_y'] + p['b_y']
self.cache = caches
self.last_h = h
return y_hat
def backward(self, dY: np.ndarray) -> dict[str, np.ndarray]:
assert self.cache is not None and self.last_h is not None
p = self.params
grads = {k: np.zeros_like(v) for k, v in p.items()}
grads['W_y'] = self.last_h.T @ dY
grads['b_y'] = np.sum(dY, axis=0)
dh = dY @ p['W_y'].T
dc = np.zeros_like(dh)
D = self.input_size
for step in reversed(self.cache):
z = step['z']
i = step['i']
f = step['f']
o = step['o']
g = step['g']
c_prev = step['c_prev']
c = step['c']
tanh_c = step['tanh_c']
# h = o * tanh(c)
do = dh * tanh_c
dc_total = dc + dh * o * (1.0 - tanh_c * tanh_c)
# c = f*c_prev + i*g
df = dc_total * c_prev
di = dc_total * g
dg = dc_total * i
dc = dc_total * f
# gate pre-activations
dai = di * i * (1.0 - i)
daf = df * f * (1.0 - f)
dao = do * o * (1.0 - o)
dag = dg * (1.0 - g * g)
grads['W_i'] += z.T @ dai
grads['b_i'] += np.sum(dai, axis=0)
grads['W_f'] += z.T @ daf
grads['b_f'] += np.sum(daf, axis=0)
grads['W_o'] += z.T @ dao
grads['b_o'] += np.sum(dao, axis=0)
grads['W_g'] += z.T @ dag
grads['b_g'] += np.sum(dag, axis=0)
dz = (
dai @ p['W_i'].T
+ daf @ p['W_f'].T
+ dao @ p['W_o'].T
+ dag @ p['W_g'].T
)
dh = dz[:, D:] # propagate to previous hidden state
return grads
def train_numpy_model(
model,
X_train: np.ndarray,
y_train: np.ndarray,
*,
epochs: int = 120,
batch_size: int = 64,
lr: float = 2e-3,
max_grad_norm: float = 1.0,
seed: int = 42,
verbose_every: int = 20,
):
rng_local = np.random.default_rng(seed)
opt = Adam(model.params, lr=lr)
history = []
n = len(X_train)
for epoch in range(1, epochs + 1):
idx = rng_local.permutation(n)
total_loss = 0.0
for start in range(0, n, batch_size):
bidx = idx[start : start + batch_size]
Xb = X_train[bidx]
yb = y_train[bidx]
y_hat = model.forward(Xb)
loss, dY = mse_loss(y_hat, yb)
grads = model.backward(dY)
grad_norm = clip_grads_(grads, max_norm=max_grad_norm)
opt.step(model.params, grads)
total_loss += loss * len(bidx)
avg = total_loss / n
history.append(avg)
if epoch == 1 or epoch % verbose_every == 0 or epoch == epochs:
print(f"epoch {epoch:>4}/{epochs} | loss={avg:.6f} | grad_norm={grad_norm:.3f}")
return np.asarray(history, dtype=float)
def predict_numpy(model, X: np.ndarray) -> np.ndarray:
return model.forward(X)
def unscale(x_scaled: np.ndarray) -> np.ndarray:
return x_scaled * train_std + train_mean
FAST_RUN = True
EPOCHS_NUMPY = 80 if FAST_RUN else 200
HIDDEN = 32
lstm_np = NumpyLSTMForecaster(input_size=1, hidden_size=HIDDEN, seed=SEED)
hist_lstm_np = train_numpy_model(
lstm_np,
X_train,
y_train,
epochs=EPOCHS_NUMPY,
batch_size=64,
lr=2e-3,
max_grad_norm=1.0,
seed=SEED,
verbose_every=20,
)
fig = px.line(y=hist_lstm_np, title="NumPy LSTM: training loss", labels={"x": "epoch", "y": "MSE"})
fig.show()
epoch 1/80 | loss=0.949535 | grad_norm=1.457
epoch 20/80 | loss=0.063779 | grad_norm=0.180
epoch 40/80 | loss=0.046973 | grad_norm=0.392
epoch 60/80 | loss=0.046404 | grad_norm=0.322
epoch 80/80 | loss=0.038749 | grad_norm=0.299
y_pred_test_lstm = unscale(predict_numpy(lstm_np, X_test))[:, 0]
y_true_test = unscale(y_test)[:, 0]
fig = go.Figure()
fig.add_trace(go.Scatter(x=t_test, y=y_true_test, mode="lines", name="true"))
fig.add_trace(go.Scatter(x=t_test, y=y_pred_test_lstm, mode="lines", name="NumPy LSTM"))
fig.update_layout(
title="One-step-ahead predictions on the test region (NumPy LSTM)",
xaxis_title="time index",
yaxis_title="value",
legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1),
)
fig.show()
mse = float(np.mean((y_pred_test_lstm - y_true_test) ** 2))
mae = float(np.mean(np.abs(y_pred_test_lstm - y_true_test)))
print(f"Test MSE: {mse:.4f}")
print(f"Test MAE: {mae:.4f}")
Test MSE: 0.1178
Test MAE: 0.2782
def rollout_forecast_numpy(model, seed_window_scaled_1d: np.ndarray, steps: int):
window = seed_window_scaled_1d.astype(float).copy()
preds = []
for _ in range(steps):
y_hat = model.forward(window[None, :, None])[0, 0]
preds.append(float(y_hat))
window = np.concatenate([window[1:], [y_hat]])
return np.asarray(preds, dtype=float)
# Roll out recursively from the first test window
roll_steps = 200
seed_window = y_scaled[split_t - SEQ_LEN : split_t]
future_true = y[split_t : split_t + roll_steps]
future_pred_scaled = rollout_forecast_numpy(lstm_np, seed_window, steps=roll_steps)
future_pred = unscale(future_pred_scaled)
fig = go.Figure()
fig.add_trace(go.Scatter(x=t[split_t : split_t + roll_steps], y=future_true, mode="lines", name="true"))
fig.add_trace(
go.Scatter(
x=t[split_t : split_t + roll_steps],
y=future_pred,
mode="lines",
name="NumPy LSTM (rollout)",
)
)
fig.update_layout(
title="Recursive rollout forecast (NumPy LSTM)",
xaxis_title="time index",
yaxis_title="value",
legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1),
)
fig.show()
Visualizing LSTM gates#
To make the LSTM less of a black box, we can inspect the gate activations for a single input window.
The plots below show hidden units (rows) × time steps (columns).
def lstm_gate_mats(model: NumpyLSTMForecaster, X_one: np.ndarray):
_ = model.forward(X_one)
assert model.cache is not None
i = np.stack([step['i'][0] for step in model.cache], axis=0).T
f = np.stack([step['f'][0] for step in model.cache], axis=0).T
o = np.stack([step['o'][0] for step in model.cache], axis=0).T
g = np.stack([step['g'][0] for step in model.cache], axis=0).T
c = np.stack([step['c'][0] for step in model.cache], axis=0).T
return i, f, o, g, c
X_one = X_test[:1]
i, f, o, g, c = lstm_gate_mats(lstm_np, X_one)
fig = make_subplots(
rows=2,
cols=2,
subplot_titles=["input gate i", "forget gate f", "output gate o", "cell state c"],
horizontal_spacing=0.08,
vertical_spacing=0.12,
)
fig.add_trace(go.Heatmap(z=i, coloraxis="coloraxis"), row=1, col=1)
fig.add_trace(go.Heatmap(z=f, coloraxis="coloraxis"), row=1, col=2)
fig.add_trace(go.Heatmap(z=o, coloraxis="coloraxis"), row=2, col=1)
fig.add_trace(go.Heatmap(z=c, coloraxis="coloraxis"), row=2, col=2)
fig.update_layout(
title="NumPy LSTM: gate activations for one window",
height=750,
coloraxis=dict(colorscale="Viridis"),
)
fig.update_xaxes(title_text="time step")
fig.update_yaxes(title_text="hidden unit")
fig.show()
4.2) NumPy GRU forecaster#
The GRU is a bit simpler than the LSTM: fewer gates and no separate cell state.
class NumpyGRUForecaster:
def __init__(self, input_size: int, hidden_size: int, seed: int = 42):
self.input_size = input_size
self.hidden_size = hidden_size
rng_local = np.random.default_rng(seed)
fan_in = input_size + hidden_size
self.params = {
'W_z': init_weight(rng_local, fan_in, hidden_size),
'b_z': np.zeros((hidden_size,), dtype=float),
'W_r': init_weight(rng_local, fan_in, hidden_size),
'b_r': np.zeros((hidden_size,), dtype=float),
'W_h': init_weight(rng_local, fan_in, hidden_size),
'b_h': np.zeros((hidden_size,), dtype=float),
'W_y': init_weight(rng_local, hidden_size, 1),
'b_y': np.zeros((1,), dtype=float),
}
self.cache = None
self.last_h = None
def forward(self, X: np.ndarray) -> np.ndarray:
B, T, D = X.shape
H = self.hidden_size
h = np.zeros((B, H), dtype=float)
caches = []
p = self.params
for t in range(T):
x_t = X[:, t, :]
concat = np.concatenate([x_t, h], axis=1)
z = sigmoid(concat @ p['W_z'] + p['b_z'])
r = sigmoid(concat @ p['W_r'] + p['b_r'])
concat2 = np.concatenate([x_t, r * h], axis=1)
h_tilde = np.tanh(concat2 @ p['W_h'] + p['b_h'])
h_next = (1.0 - z) * h + z * h_tilde
caches.append(
{
'x_t': x_t,
'h_prev': h,
'concat': concat,
'concat2': concat2,
'z': z,
'r': r,
'h_tilde': h_tilde,
}
)
h = h_next
y_hat = h @ p['W_y'] + p['b_y']
self.cache = caches
self.last_h = h
return y_hat
def backward(self, dY: np.ndarray) -> dict[str, np.ndarray]:
assert self.cache is not None and self.last_h is not None
p = self.params
grads = {k: np.zeros_like(v) for k, v in p.items()}
grads['W_y'] = self.last_h.T @ dY
grads['b_y'] = np.sum(dY, axis=0)
dh = dY @ p['W_y'].T
D = self.input_size
for step in reversed(self.cache):
h_prev = step['h_prev']
concat = step['concat']
concat2 = step['concat2']
z = step['z']
r = step['r']
h_tilde = step['h_tilde']
# h = (1-z)*h_prev + z*h_tilde
dh_tilde = dh * z
dz = dh * (h_tilde - h_prev)
dh_prev = dh * (1.0 - z)
# h_tilde = tanh(concat2 @ W_h + b_h)
da_h = dh_tilde * (1.0 - h_tilde * h_tilde)
grads['W_h'] += concat2.T @ da_h
grads['b_h'] += np.sum(da_h, axis=0)
dconcat2 = da_h @ p['W_h'].T
drh_prev = dconcat2[:, D:]
dr = drh_prev * h_prev
dh_prev += drh_prev * r
# r gate
da_r = dr * r * (1.0 - r)
grads['W_r'] += concat.T @ da_r
grads['b_r'] += np.sum(da_r, axis=0)
dconcat_r = da_r @ p['W_r'].T
# z gate
da_z = dz * z * (1.0 - z)
grads['W_z'] += concat.T @ da_z
grads['b_z'] += np.sum(da_z, axis=0)
dconcat_z = da_z @ p['W_z'].T
dconcat = dconcat_r + dconcat_z
dh_prev += dconcat[:, D:]
dh = dh_prev
return grads
gru_np = NumpyGRUForecaster(input_size=1, hidden_size=HIDDEN, seed=SEED)
hist_gru_np = train_numpy_model(
gru_np,
X_train,
y_train,
epochs=EPOCHS_NUMPY,
batch_size=64,
lr=2e-3,
max_grad_norm=1.0,
seed=SEED,
verbose_every=20,
)
fig = px.line(y=hist_gru_np, title="NumPy GRU: training loss", labels={"x": "epoch", "y": "MSE"})
fig.show()
epoch 1/80 | loss=0.750055 | grad_norm=2.316
epoch 20/80 | loss=0.064718 | grad_norm=0.312
epoch 40/80 | loss=0.047928 | grad_norm=0.152
epoch 60/80 | loss=0.046996 | grad_norm=0.416
epoch 80/80 | loss=0.038497 | grad_norm=0.290
y_pred_test_gru = unscale(predict_numpy(gru_np, X_test))[:, 0]
fig = go.Figure()
fig.add_trace(go.Scatter(x=t_test, y=y_true_test, mode="lines", name="true"))
fig.add_trace(go.Scatter(x=t_test, y=y_pred_test_gru, mode="lines", name="NumPy GRU"))
fig.update_layout(
title="One-step-ahead predictions on the test region (NumPy GRU)",
xaxis_title="time index",
yaxis_title="value",
legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1),
)
fig.show()
mse = float(np.mean((y_pred_test_gru - y_true_test) ** 2))
mae = float(np.mean(np.abs(y_pred_test_gru - y_true_test)))
print(f"Test MSE: {mse:.4f}")
print(f"Test MAE: {mae:.4f}")
Test MSE: 0.0882
Test MAE: 0.2428
Visualizing GRU gates#
We’ll inspect:
update gate \(z_t\)
reset gate \(r_t\)
def gru_gate_mats(model: NumpyGRUForecaster, X_one: np.ndarray):
_ = model.forward(X_one)
assert model.cache is not None
z = np.stack([step['z'][0] for step in model.cache], axis=0).T
r = np.stack([step['r'][0] for step in model.cache], axis=0).T
h_tilde = np.stack([step['h_tilde'][0] for step in model.cache], axis=0).T
return z, r, h_tilde
z, r, h_tilde = gru_gate_mats(gru_np, X_one)
fig = make_subplots(
rows=1,
cols=3,
subplot_titles=["update gate z", "reset gate r", "candidate h~"],
horizontal_spacing=0.06,
)
fig.add_trace(go.Heatmap(z=z, coloraxis="coloraxis"), row=1, col=1)
fig.add_trace(go.Heatmap(z=r, coloraxis="coloraxis"), row=1, col=2)
fig.add_trace(go.Heatmap(z=h_tilde, coloraxis="coloraxis"), row=1, col=3)
fig.update_layout(
title="NumPy GRU: gate activations for one window",
height=380,
coloraxis=dict(colorscale="Viridis"),
)
fig.update_xaxes(title_text="time step")
fig.update_yaxes(title_text="hidden unit")
fig.show()
5) PyTorch: the same idea with nn.LSTM / nn.GRU#
PyTorch takes care of:
parameter initialization
BPTT + autograd
efficient kernels
We’ll keep the exact same dataset and model shape to make the comparison fair.
class WindowDataset(Dataset):
def __init__(self, X: np.ndarray, y: np.ndarray):
self.X = torch.tensor(X, dtype=torch.float32)
self.y = torch.tensor(y, dtype=torch.float32)
def __len__(self) -> int:
return self.X.shape[0]
def __getitem__(self, idx: int):
return self.X[idx], self.y[idx]
class TorchRNNForecaster(nn.Module):
def __init__(self, rnn_type: str, input_size: int = 1, hidden_size: int = 32):
super().__init__()
rnn_type = rnn_type.lower()
if rnn_type == 'lstm':
self.rnn = nn.LSTM(input_size, hidden_size, batch_first=True)
elif rnn_type == 'gru':
self.rnn = nn.GRU(input_size, hidden_size, batch_first=True)
else:
raise ValueError("rnn_type must be 'lstm' or 'gru'")
self.head = nn.Linear(hidden_size, 1)
def forward(self, x: torch.Tensor) -> torch.Tensor:
out, _ = self.rnn(x)
h_last = out[:, -1, :]
return self.head(h_last)
def train_torch_model(
model: nn.Module,
loader: DataLoader,
*,
epochs: int = 40,
lr: float = 1e-3,
max_grad_norm: float = 1.0,
device: str = 'cpu',
verbose_every: int = 10,
):
model.to(device)
opt = torch.optim.Adam(model.parameters(), lr=lr)
loss_fn = nn.MSELoss()
history = []
for epoch in range(1, epochs + 1):
model.train()
total = 0.0
for xb, yb in loader:
xb = xb.to(device)
yb = yb.to(device)
opt.zero_grad(set_to_none=True)
pred = model(xb)
loss = loss_fn(pred, yb)
loss.backward()
nn.utils.clip_grad_norm_(model.parameters(), max_grad_norm)
opt.step()
total += float(loss.item()) * xb.shape[0]
avg = total / len(loader.dataset)
history.append(avg)
if epoch == 1 or epoch % verbose_every == 0 or epoch == epochs:
print(f"epoch {epoch:>4}/{epochs} | loss={avg:.6f}")
return np.asarray(history, dtype=float)
def predict_torch(model: nn.Module, X: np.ndarray, device: str = 'cpu') -> np.ndarray:
model.eval()
with torch.no_grad():
preds = model(torch.tensor(X, dtype=torch.float32, device=device)).cpu().numpy()
return preds
import warnings
def pick_device() -> str:
with warnings.catch_warnings():
warnings.filterwarnings("ignore", message="CUDA initialization.*")
try:
if torch.cuda.is_available():
return "cuda"
except Exception:
pass
return "cpu"
device = pick_device()
print('device:', device)
EPOCHS_TORCH = 40 if FAST_RUN else 120
train_loader = DataLoader(WindowDataset(X_train, y_train), batch_size=64, shuffle=True)
torch_lstm = TorchRNNForecaster('lstm', input_size=1, hidden_size=HIDDEN)
torch_gru = TorchRNNForecaster('gru', input_size=1, hidden_size=HIDDEN)
hist_torch_lstm = train_torch_model(
torch_lstm,
train_loader,
epochs=EPOCHS_TORCH,
lr=1e-3,
max_grad_norm=1.0,
device=device,
verbose_every=10,
)
hist_torch_gru = train_torch_model(
torch_gru,
train_loader,
epochs=EPOCHS_TORCH,
lr=1e-3,
max_grad_norm=1.0,
device=device,
verbose_every=10,
)
fig = go.Figure()
fig.add_trace(go.Scatter(y=hist_lstm_np, mode='lines', name='NumPy LSTM'))
fig.add_trace(go.Scatter(y=hist_gru_np, mode='lines', name='NumPy GRU'))
fig.add_trace(go.Scatter(y=hist_torch_lstm, mode='lines', name='Torch LSTM'))
fig.add_trace(go.Scatter(y=hist_torch_gru, mode='lines', name='Torch GRU'))
fig.update_layout(title='Training loss comparison', xaxis_title='epoch', yaxis_title='MSE')
fig.show()
device: cpu
epoch 1/40 | loss=0.948046
epoch 10/40 | loss=0.205258
epoch 20/40 | loss=0.078450
epoch 30/40 | loss=0.058539
epoch 40/40 | loss=0.046161
epoch 1/40 | loss=0.924206
epoch 10/40 | loss=0.171874
epoch 20/40 | loss=0.076581
epoch 30/40 | loss=0.070679
epoch 40/40 | loss=0.067584
y_pred_test_torch_lstm = unscale(predict_torch(torch_lstm, X_test, device=device))[:, 0]
y_pred_test_torch_gru = unscale(predict_torch(torch_gru, X_test, device=device))[:, 0]
fig = go.Figure()
fig.add_trace(go.Scatter(x=t_test, y=y_true_test, mode="lines", name="true"))
fig.add_trace(go.Scatter(x=t_test, y=y_pred_test_lstm, mode="lines", name="NumPy LSTM"))
fig.add_trace(go.Scatter(x=t_test, y=y_pred_test_gru, mode="lines", name="NumPy GRU"))
fig.add_trace(go.Scatter(x=t_test, y=y_pred_test_torch_lstm, mode="lines", name="Torch LSTM"))
fig.add_trace(go.Scatter(x=t_test, y=y_pred_test_torch_gru, mode="lines", name="Torch GRU"))
fig.update_layout(
title="One-step-ahead predictions (test region)",
xaxis_title="time index",
yaxis_title="value",
legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1),
)
fig.show()
6) Pitfalls + diagnostics (practical)#
Scale your data: unscaled series often makes training unstable.
Exploding gradients: use gradient clipping (we did).
Sequence length: too short → misses long seasonality; too long → harder optimization.
Train/test split: split by time, not random shuffle.
Rollout error accumulation: one-step models can drift when rolled out recursively.
If you want a stronger forecaster:
predict multiple future steps (seq2seq)
add exogenous features (calendar, holidays, covariates)
use teacher forcing / scheduled sampling
7) Exercises#
Increase the noise and compare LSTM vs GRU stability.
Add a second seasonal component with a longer period and test different
SEQ_LEN.Predict multiple steps ahead (e.g. next 10 points) and compare direct vs recursive forecasting.
Add dropout (PyTorch) and see how it affects rollout drift.
References#
Hochreiter & Schmidhuber (1997): Long Short-Term Memory
Cho et al. (2014): Learning Phrase Representations using RNN Encoder–Decoder (introduces GRU)
Goodfellow, Bengio, Courville: Deep Learning (sequence models chapters)